DS 504 Big data analytics project2

  1. find intersection center
  2. weighted by dist from intersections
  3. transform plot to show stop/run pattern (log, 1/x, ...)
  4. remove noises
In [2]:
%matplotlib inline
from matplotlib import pyplot as plt
from math import sqrt
import pandas as pd
import numpy as np
import os, random, sys

# join result path
JoinResPath = "./data/JoinResults/"
# list of join result file names
Ilist = [i for i in os.listdir(JoinResPath) if not i.startswith(".")]

# read join result, if id is not given, randomly read one, 
# least only works when id is not given, i.e., if returned join result
# does not contain enough points, it will recusively call itself again
def readJR(Id=None,LEAST = 100):
    #least: threshold for number of trajectory points
    Iid = (str(Id) if type(Id)==int else Id) if Id else random.choice(Ilist)
    with open(JoinResPath+Iid,"rb") as f:
        res = []
        for i in f:
            res.append(i.strip().split(","))
    df = pd.DataFrame(res[1:])
    if not Id and not len(df)>LEAST:
        return readJR(None,LEAST)
    df[1],df[2], df[3] = pd.to_datetime(df[1]), df[2].astype(float), df[3].astype(float) 
    return (int(res[0][0]),float(res[0][1]),float(res[0][2])),df.sort_values([0,1])

# plot scatter of join result
# c is center of intersection, which will be ploted in red dot
# df is the trajectory points in the form of pandas dataframe
# kargs is the key parameters, that will be passed to the plt.scatter func
# inorder to customize the scatter plot params in global scope
def plotJR(c,df,**kargs):
    plt.scatter(df[2],df[3],**kargs)
    plt.scatter(float(c[1]),float(c[2]),color="r",s=30)
    xl,xu,yl,yu=df[2].min(),df[2].max(),df[3].min(),df[3].max()
    xrng,yrng = xu-xl or 0.001,yu-yl or 0.001
    plt.xlim([xl-.1*xrng,xu+.1*xrng])
    plt.ylim([yl-.1*yrng,yu+.1*yrng])
    plt.title("Intersection:%d, # of points:%d"%(c[0],len(df)))

# given a trajectories data frame, split it according to the time interval.
# the rng is the params passed to the dateOffset, which will define the interval.
# if two adjacent trajectory points' time intervel is grearter than this DateOffset,
# it will be divided into to two trajectories
# return result will be a list of dataframe, ordered by their first point's timestamp
def spliTraj(df,**rng):
    curr, last = None, None
    currID, lastID = None, None
    split=[0]
    for i in xrange(df.shape[0]):
        if type(curr)!=type(None):
            last, lastID = curr,currID
        curr,currID = df.iloc[i,1],df.iloc[i,0]
        if type(last)!=type(None):
            if curr>pd.DateOffset(**rng)+last or currID!=lastID:
                split.append(i)
    split.append(i+1)
    #return split
    ret = []
    for i in xrange(1,len(split)):
        splited=df.iloc[split[i-1]:split[i],:]
        ret.append(splited)
    return sorted(ret,cmp=lambda x,y:cmp(x.iloc[0,1],y.iloc[0,1]))
    
# Just like plotJR, but it will plot in the form of line.
# this func takes the results from spliTraj, so dfs is the list of dataframe
# c is still the center of the intersection, and kargs is for customizing the plot
# params in global scope
def plotJR_line(c,dfs,**kargs):
    plt.scatter(c[1],c[2],color="r",s=30)
    if type(dfs)!=list:dfs=[dfs]
    for df in dfs: 
        for t in set(df.iloc[:,0]):    
            if len(df)>1:plt.plot(df.iloc[:,2],df.iloc[:,3],**kargs)
    dfs=pd.concat(dfs)
    xl,xu,yl,yu=dfs[2].min(),dfs[2].max(),dfs[3].min(),dfs[3].max() 
    xrng,yrng = xu-xl or 0.001,yu-yl or 0.001
    plt.xlim([xl-.1*xrng,xu+.1*xrng])
    plt.ylim([yl-.1*yrng,yu+.1*yrng])
    plt.title("Intersection:%d, # of points:%d"%(c[0],len(dfs)))

# given a range and start time, filter the df (range qurey).
# no rng return df itself, 
# df could be a list of df, that actually return from splitTraj,
# or it is just a df, not list. No matter what, func will just
# filter the row of the dataframe by time range query mask
def timeRange(df,time,**rng):
    if len(rng)==0:return df
    if type(df)!=list: 
        mask = df.ix[:,1]<time+pd.DateOffset(**rng)
        mask &= df.ix[:,1]>=time
        return None if df.ix[mask,:].empty else df.ix[mask,:]
    else:
        ret = []
        for i in df:
            tmp=timeRange(i,time,**rng)
            if type(tmp)!=type(None):ret.append(tmp)
        return ret

# given the result from splitTraj, extract the info of intervals, like speed,
# directions, the direction will be normalized into unit vector
# the interval info will be called features, to contribute the inferring 
# verbose tells the progress, but makes cell messy
def calc_features(dfs,verbose=0,c=None):
    features=[]
    count=0
    for df in dfs:
        if len(df)<2:continue
        # interval infos.
        tdiff,xdiff,ydiff = [df[i].diff().iloc[1:] for i in (1,2,3)]
        # distances
        d = list(np.sqrt(xdiff**2+ydiff**2))
        # direction vectors
        xd = [0 if np.isnan(i) else i for i in (np.divide(xdiff,d))]
        yd = [0 if np.isnan(i) else i for i in (np.divide(ydiff,d))]
        # speed
        v = list(d/tdiff.astype('timedelta64[s]'))
        stacked = [v,xd,yd]
        # distances from intersections
        if c:
            x_from_center, y_from_center = np.array(df[2]-c[1]), np.array(df[3]-c[2])
            dist_sq_from_center = x_from_center**2+y_from_center**2
            stacked.append(dist_sq_from_center[1:]+dist_sq_from_center[:-1])
        features.append({"time":list(df[1]),"status":np.vstack(stacked).T})
        if verbose:
            count+=1
            print "\r%d/%d" % (count,len(dfs))
    return features
In [4]:
# plot scatter for one of the intersetions

# some typical intersection id:
# 3420,2076,3712,1138,4133,3693,4951,2000,1696,4597,2032,1722
center,points = readJR(1722,LEAST=50000)
print(center[0],center[2],center[1]),(points[3].min(),points[2].min(),points[3].max(),points[2].max())
print "http://maps.google.com/maps?z=12&t=k&q=loc:%f+%f" % (center[2],center[1])
plt.figure(figsize=(8,5))
plotJR(center,points,s=1,color="b",alpha=.3)
plt.show()
(1722, 22.547561, 114.0678526) (22.545565, 114.06585699999999, 22.549558999999999, 114.06983200000001)
http://maps.google.com/maps?z=12&t=k&q=loc:22.547561+114.067853
In [5]:
# split first, save split result into variable "SplitTJ", then plot trajectories lines

# Tunning Variable: split interval can be tunning to control how long enough in order to split
SPLIT_INTERVAL = {"minutes":0,"hours":0,"seconds":30}

plt.figure(figsize=(8,5))
splitTJ = spliTraj(points,**SPLIT_INTERVAL)
plotJR_line(center,splitTJ,alpha=.2)
plt.show()
reduce(lambda x,y:x+y,map(len,splitTJ)),len(splitTJ)
Out[5]:
(92251, 27496)
In [6]:
# first filter time range, now it's the whole day,
# then, calculate the speed, directions for time intervals
# put them into list, namely, features

# Tunning Variables: consider the whole day or just certain time range
TIME_RNG = {"hours":24}

rngres = timeRange(splitTJ,pd.Timestamp("00:00:00"),**TIME_RNG)
features=calc_features(rngres,c=center)
"# of trajectories: %d"%len(rngres)
Out[6]:
'# of trajectories: 27496'
In [7]:
# first filter directions then plot direction vetors.
# cuml_dirs is responsible for collecting the filtered features,
# (0,0) will be removed
# sometimes (+/-1,0),(0,+/-1) need removing as well.

# alpha need to be tunned sometimes

cuml_dirs = []
for i in features:
    filter_ = i["status"][:,1:3].sum(axis=1)!=0.0
    ##remove 1,0
    filter_ &= abs(i["status"][:,1:3].sum(axis=1))!=1
    cuml_dirs.append(i["status"][filter_,1:3])
cuml_dirs = np.vstack(cuml_dirs)
plt.figure(figsize=(5,5))
plt.scatter(cuml_dirs[:,0],cuml_dirs[:,1],s=100,color="k",alpha=0.002)
plt.show()
print len(cuml_dirs)
31153
In [8]:
# clustering the directions using kmeans
# using kmeans if point number is big enough
# otherwise, using KMedoids
# record the clustering results/directions in variable "directions"
# also has the clustering labels in "labels", not used yet, might be useful in the future

# kMedoids sometimes fails, so just use kmeans

from sklearn.cluster import KMeans
from pyclust import KMedoids

# Tunning Variables
# for now, this param must be tuned manually
# Cuz KMedoids is a fast algo(KMeans is faster), but must give it a param.
DIRECTION_NUM=4

if len(cuml_dirs)<1000:
    model = KMedoids(n_clusters=DIRECTION_NUM)
    model.fit(cuml_dirs)
    directions = model.centers_
else:
    model = KMeans(n_clusters=DIRECTION_NUM)
    model.fit(cuml_dirs)
    directions = model.cluster_centers_
labels = model.labels_
plt.figure(figsize=(5,5))
colors="rybgkmc"*30
for i in list(set(model.labels_)):
    plt.scatter(cuml_dirs[labels==i,0],cuml_dirs[labels==i,1],
                s=100,alpha=0.01,color=colors[i])
plt.scatter(directions[:,0],directions[:,1],marker="x",s=500,color="k")
plt.show()
del model
In [22]:
# this func takes interval infos for "feature", like speed & diretions, 
# and distributes them to every second of a day, i.e. 86400 (sec/day)
# the result are recorded in the first pass-in arg, "monitor"
# direction is to tell func which dir you want to monitor
# and directions should be one from previous kMeans result.

# tunning parameters:
COS_POW = 4 # set 0 to turn off direction, higher more weighted by one direction
DIST_VAR = 1e-3 # set 1 to make it nearly no difference, less means more weighted by distance

def monitoring(monitor,feature,direction): 
    starts = [(i-pd.Timestamp("00:00:00")).seconds for i in feature["time"]]
    y = feature["status"][:,0]/np.exp(feature["status"][:,3]/DIST_VAR)
    if type(direction) != type(None):
        cosine = np.dot(feature["status"][:,1:3],direction)/sqrt(2)
        similarity = map(lambda x:max([0,x]),cosine**COS_POW)
        y = y*similarity
    for i in xrange(len(y)):
        for j in xrange(starts[i],starts[i+1]):
            monitor[j].append(y[i])

# for each direciton, monitor the every second's speed trend
# saving result in the speed_monitors
speed_monitors=[]            
for direction in directions:
    speed_monitor=[[] for i in xrange(86400)]
    for feature in features:
        monitoring(speed_monitor,feature,direction)
    speed_monitors.append(speed_monitor)
print "Done!"   
Done!
In [23]:
# calculate the speed trends
# remove inf and nan using func "wrapped"
# then plot speed trend in each direction

def wrapped(x):
    if len(x)==0:return 0
    return np.nanmean([i for i in x if np.isfinite(i)])
trends=[]
for i in xrange(len(directions)):
    trend = []
    for j in speed_monitors[i]:
        trend.append(wrapped(j))
    trends.append(1-1/(1+1e6*np.array(trend)))
    plt.figure(figsize=(20,1))
    plt.plot(1-1/(1+1e6*np.array(trend)),color="k")
    plt.xlim([40000,40300])
    plt.title(directions[i])
    plt.show()
In [24]:
np.corrcoef(trends)
Out[24]:
array([[ 1.        , -0.05639556, -0.05720979,  0.99901896],
       [-0.05639556,  1.        ,  0.9995019 , -0.05744291],
       [-0.05720979,  0.9995019 ,  1.        , -0.05885099],
       [ 0.99901896, -0.05744291, -0.05885099,  1.        ]])
In [17]:
import numpy as np
import pylab as pl

lower=.1
upper=.5
plot_size=7200

def plot_status(trend,lower,upper):
    subN=4
    for i,c,condtion in zip(range(subN),"kryg",("x=trend==0","x=(trend<lower)&(trend!=0)",
                                          "x=(trend<upper)&(trend>=lower)","x=trend>=upper")):
        fig.add_subplot(subN,1,i+1)
        exec condtion
        plt.fill_between(np.arange(len(trend)),x,1e-99,color=c,alpha=1 if i!=0 else .5,edgecolor="none")
        plt.xlim([0,len(trend)]),plt.ylim([0,1])
        plt.yticks([])

for i in range(86400/plot_size):
    for j in range(4):
        fig = plt.figure(figsize=(20,2))
        plt.title(directions[j]),plt.xticks([]),plt.yticks([])
        plot_status(trends[j][i*plot_size:plot_size*(i+1)],lower,upper)
        plt.show()
    print i,"-"*20
0 --------------------
1 --------------------
2 --------------------
3 --------------------
4 --------------------
5 --------------------
6 --------------------
7 --------------------
8 --------------------
9 --------------------
10 --------------------
11 --------------------
In [46]:
# correlation of trends
reshapef=lambda x:((86400-86400%x)/x,x)

coefs=[]
for sec in range(100,600):
    reshaped = trends[0][:86400-86400%sec].reshape(reshapef(sec))
    coef=[]
    for i in xrange(len(reshaped)-1):
        coef.append(np.corrcoef(reshaped[i],reshaped[i+1])[0][1])
    coefs.append([np.mean(coef[i:i+30]) for i in xrange(len(coef)-29)])
In [47]:
print [len(i) for i in coefs]
[834, 825, 817, 808, 800, 792, 785, 777, 770, 762, 755, 748, 741, 734, 727, 721, 714, 708, 702, 696, 690, 684, 678, 672, 666, 661, 655, 650, 645, 639, 634, 629, 624, 619, 614, 610, 605, 600, 596, 591, 587, 582, 578, 574, 570, 565, 561, 557, 553, 549, 546, 542, 538, 534, 531, 527, 523, 520, 516, 513, 510, 506, 503, 500, 496, 493, 490, 487, 484, 481, 478, 475, 472, 469, 466, 463, 460, 458, 455, 452, 450, 447, 444, 442, 439, 437, 434, 432, 429, 427, 424, 422, 420, 417, 415, 413, 410, 408, 406, 404, 402, 399, 397, 395, 393, 391, 389, 387, 385, 383, 381, 379, 377, 375, 373, 371, 370, 368, 366, 364, 362, 360, 359, 357, 355, 354, 352, 350, 348, 347, 345, 344, 342, 340, 339, 337, 336, 334, 333, 331, 330, 328, 327, 325, 324, 322, 321, 319, 318, 316, 315, 314, 312, 311, 310, 308, 307, 306, 304, 303, 302, 301, 299, 298, 297, 296, 294, 293, 292, 291, 290, 288, 287, 286, 285, 284, 283, 281, 280, 279, 278, 277, 276, 275, 274, 273, 272, 271, 270, 268, 267, 266, 265, 264, 263, 262, 261, 260, 259, 258, 258, 257, 256, 255, 254, 253, 252, 251, 250, 249, 248, 247, 246, 246, 245, 244, 243, 242, 241, 240, 240, 239, 238, 237, 236, 235, 235, 234, 233, 232, 231, 231, 230, 229, 228, 227, 227, 226, 225, 224, 224, 223, 222, 221, 221, 220, 219, 218, 218, 217, 216, 216, 215, 214, 214, 213, 212, 212, 211, 210, 210, 209, 208, 208, 207, 206, 206, 205, 204, 204, 203, 202, 202, 201, 201, 200, 199, 199, 198, 197, 197, 196, 196, 195, 195, 194, 193, 193, 192, 192, 191, 190, 190, 189, 189, 188, 188, 187, 187, 186, 186, 185, 184, 184, 183, 183, 182, 182, 181, 181, 180, 180, 179, 179, 178, 178, 177, 177, 176, 176, 175, 175, 174, 174, 173, 173, 172, 172, 171, 171, 170, 170, 170, 169, 169, 168, 168, 167, 167, 166, 166, 165, 165, 165, 164, 164, 163, 163, 162, 162, 162, 161, 161, 160, 160, 159, 159, 159, 158, 158, 157, 157, 157, 156, 156, 155, 155, 155, 154, 154, 153, 153, 153, 152, 152, 151, 151, 151, 150, 150, 150, 149, 149, 148, 148, 148, 147, 147, 147, 146, 146, 145, 145, 145, 144, 144, 144, 143, 143, 143, 142, 142, 142, 141, 141, 141, 140, 140, 140, 139, 139, 139, 138, 138, 138, 137, 137, 137, 136, 136, 136, 135, 135, 135, 134, 134, 134, 133, 133, 133, 133, 132, 132, 132, 131, 131, 131, 130, 130, 130, 130, 129, 129, 129, 128, 128, 128, 127, 127, 127, 127, 126, 126, 126, 125, 125, 125, 125, 124, 124, 124, 124, 123, 123, 123, 122, 122, 122, 122, 121, 121, 121, 121, 120, 120, 120, 120, 119, 119, 119, 118, 118, 118, 118, 117, 117, 117, 117, 116, 116, 116, 116, 115, 115, 115, 115, 114, 114, 114, 114]
In [85]:
fig = plt.figure(figsize=(20,300))
for sec in range(300,500):
    ax=fig.add_subplot(50,4,sec-299)
    reshaped = trends[0][:86400-86400%sec].reshape(reshapef(sec))
    ax.matshow(1-np.corrcoef(reshaped),cmap=plt.cm.gray)
    plt.title(sec)
In [ ]:
fig = plt.figure(figsize=(20,20))
sec=445
reshaped = trends[0][:86400-86400%sec].reshape(reshapef(sec))
plt.imshow(1-np.corrcoef(reshaped),cmap=plt.cm.gray)
plt.show()
In [ ]: